查看原文
其他

数据库中间件 MyCAT 源码分析 —— 跨库两表Join

芋道源码 SpringForAll社区 2020-10-17

摘要: 原创出处 http://www.iocoder.cn/MyCAT/two-table-share-join/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 MyCAT 1.6.5 正式版

  • 1. 概述

  • 2. 主流程

  • \3. ShareJoin

  • 3.1 JoinParser

  • 3.2 ShareJoin.processSQL(...)

  • 3.3 BatchSQLJob

  • 3.4 ShareDBJoinHandler

  • 3.5 ShareRowOutPutDataHandler

  • 4. 彩蛋


1. 概述

MyCAT 支持跨库表 Join,目前版本仅支持跨库表 Join。虽然如此,已经能够满足我们大部分的业务场景。况且,Join 过多的表可能带来的性能问题也是很麻烦的。

本文主要分享:

  1. 整体流程、调用顺序图

  2. 核心代码的分析

前置阅读:《MyCAT 源码分析 —— 【单库单表】查询》。

OK,Let's Go。

2. 主流程

当执行跨库两表 Join SQL 时,经历的大体流程如下:

SQL 上,需要添加注解 /*!mycat:catlet=io.mycat.catlets.ShareJoin */${SQL}RouteService#route(...) 解析注解 mycat:catlet 后,路由给 HintCatletHandler 作进一步处理。

HintCatletHandler 获取注解对应的 Catlet 实现类, io.mycat.catlets.ShareJoin 就是其中一种实现(目前也只有这一种实现),提供了跨库两表 Join 的功能。从类命名上看, ShareJoin 很大可能性后续会提供完整的跨库多表的 Join 功能。

核心代码如下:

  1. // HintCatletHandler.java

  2. public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema,

  3.                           int sqlType, String realSQL, String charset, ServerConnection sc,

  4.                           LayerCachePool cachePool, String hintSQLValue, int hintSqlType, Map hintMap)

  5.       throws SQLNonTransientException {

  6.   String cateletClass = hintSQLValue;

  7.   if (LOGGER.isDebugEnabled()) {

  8.       LOGGER.debug("load catelet class:" + hintSQLValue + " to run sql " + realSQL);

  9.   }

  10.   try {

  11.       Catlet catlet = (Catlet) MycatServer.getInstance().getCatletClassLoader().getInstanceofClass(cateletClass);

  12.       catlet.route(sysConfig, schema, sqlType, realSQL, charset, sc, cachePool);

  13.       catlet.processSQL(realSQL, new EngineCtx(sc.getSession2()));

  14.   } catch (Exception e) {

  15.       LOGGER.warn("catlet error " + e);

  16.       throw new SQLNonTransientException(e);

  17.   }

  18.   return null;

  19. }

3. ShareJoin

目前支持跨库表 Join。 ShareJoin 将 SQL 拆分成左表 SQL 和 右表 SQL,发送给各数据节点执行,汇总数据结果进行合后返回。

伪代码如下:

  1. // SELECT u.id, o.id FROM t_order o

  2. // INNER JOIN t_user u ON o.uid = u.id

  3. // 【顺序】查询左表

  4. String leftSQL = "SELECT o.id, u.id FROM t_order o";

  5. List leftList = dn[0].select(leftSQL) + dn[1].select(leftSQL) + ... + dn[n].select(leftsql);

  6. // 【并行】查询右表

  7. String rightSQL = "SELECT u.id FROM t_user u WHERE u.id IN (${leftList.uid})";

  8. for (dn : dns) { // 此处是并行执行,使用回调逻辑

  9.    for (rightRecord : dn.select(rightSQL)) { // 查询右表

  10.        // 合并结果

  11.        for (leftRecord : leftList) {

  12.            if (leftRecord.uid == rightRecord.id) {

  13.                write(leftRecord + leftRecord.uid 拼接结果);

  14.            }

  15.        }

  16.    }

  17. }

实际情况会更加复杂,我们接下来一点点往下看。

3.1 JoinParser

JoinParser 负责对 SQL 进行解析。整体流程如下:

举个例子, /*!mycat:catlet=io.mycat.catlets.ShareJoin */SELECT o.id,u.usernamefromt_order o join t_user u on o.uid=u.id; 解析后, TableFilter 结果如下:

  • tName :表名

  • tAlia :表自定义命名

  • where :过滤条件

  • order :排序条件

  • parenTable :左连接的 Join 的表名。 t_user表 在 join属性 的 parenTable 为 "o",即 t_order

  • joinParentkey :左连接的 Join 字段

  • joinKey :join 字段。 t_user表 在 join属性 为 id

  • join :子 tableFilter。即,该表连接的右边的表。

  • parent :和 join属性 相对。

看到此处,大家可能有疑问,为什么要把 SQL 解析成 TableFilterJoinParser根据 TableFilter 生成数据节点执行 SQL。代码如下:

  1. // TableFilter.java

  2. public String getSQL() {

  3.   String sql = "";

  4.   // fields

  5.   for (Entry<String, String> entry : fieldAliasMap.entrySet()) {

  6.       String key = entry.getKey();

  7.       String val = entry.getValue();

  8.       if (val == null) {

  9.           sql = unionsql(sql, getFieldfrom(key), ",");

  10.       } else {

  11.           sql = unionsql(sql, getFieldfrom(key) + " as " + val, ",");

  12.       }

  13.   }

  14.   // where

  15.   if (parent == null) {    // on/where 等于号左边的表

  16.       String parentJoinKey = getJoinKey(true);

  17.       // fix sharejoin bug:

  18.       // (AbstractConnection.java:458) -close connection,reason:program err:java.lang.IndexOutOfBoundsException:

  19.       // 原因是左表的select列没有包含 join 列,在获取结果时报上面的错误

  20.       if (sql != null && parentJoinKey != null &&

  21.               !sql.toUpperCase().contains(parentJoinKey.trim().toUpperCase())) {

  22.           sql += ", " + parentJoinKey;

  23.       }

  24.       sql = "select " + sql + " from " + tName;

  25.       if (!(where.trim().equals(""))) {

  26.           sql += " where " + where.trim();

  27.       }

  28.   } else {    // on/where 等于号右边边的表

  29.       if (allField) {

  30.           sql = "select " + sql + " from " + tName;

  31.       } else {

  32.           sql = unionField("select " + joinKey, sql, ",");

  33.           sql = sql + " from " + tName;

  34.           //sql="select "+joinKey+","+sql+" from "+tName;

  35.       }

  36.       if (!(where.trim().equals(""))) {

  37.           sql += " where " + where.trim() + " and (" + joinKey + " in %s )";

  38.       } else {

  39.           sql += " where " + joinKey + " in %s ";

  40.       }

  41.   }

  42.   // order

  43.   if (!(order.trim().equals(""))) {

  44.       sql += " order by " + order.trim();

  45.   }

  46.   // limit

  47.   if (parent == null) {

  48.       if ((rowCount > 0) && (offset > 0)) {

  49.           sql += " limit" + offset + "," + rowCount;

  50.       } else {

  51.           if (rowCount > 0) {

  52.               sql += " limit " + rowCount;

  53.           }

  54.       }

  55.   }

  56.   return sql;

  57. }

  • 当 parent 为空时,即on/where 等于号左边的表。例如: selectid,uidfromt_order

  • 当 parent 不为空时,即on/where 等于号右边的表。例如: selectid,usernamefromt_userwhereidin(1,2,3)

3.2 ShareJoin.processSQL(...)

当 SQL 解析完后,生成左边的表执行的 SQL,发送给对应的数据节点查询数据。大体流程如下:

当 SQL 为 /*!mycat:catlet=io.mycat.catlets.ShareJoin */SELECT o.id,u.usernamefromt_order o join t_user u on o.uid=u.id; 时, sql=getSql()的返回结果为 selectid,uidfromt_order

生成左边的表执行的 SQL 后,顺序顺序顺序发送给对应的数据节点查询数据。具体顺序查询是怎么实现的,我们来看下章 BatchSQLJob

3.3 BatchSQLJob

EngineCtxBatchSQLJob 封装,提供上层两个方法:

  1. executeNativeSQLSequnceJob :顺序(非并发)在每个数据节点执行SQL任务

  2. executeNativeSQLParallJob :并发在每个数据节点执行SQL任务

核心代码如下:

  1. // EngineCtx.java

  2. public void executeNativeSQLSequnceJob(String[] dataNodes, String sql,

  3.        SQLJobHandler jobHandler) {

  4.    for (String dataNode : dataNodes) {

  5.        SQLJob job = new SQLJob(jobId.incrementAndGet(), sql, dataNode,

  6.                jobHandler, this);

  7.        bachJob.addJob(job, false);

  8.    }

  9. }

  10. public void executeNativeSQLParallJob(String[] dataNodes, String sql,

  11.        SQLJobHandler jobHandler) {

  12.    for (String dataNode : dataNodes) {

  13.        SQLJob job = new SQLJob(jobId.incrementAndGet(), sql, dataNode,

  14.                jobHandler, this);

  15.        bachJob.addJob(job, true);

  16.    }

  17. }


BatchSQLJob 通过执行中任务列表待执行任务列表来实现顺序/并发执行任务。核心代码如下:

  1. // BatchSQLJob.java

  2. /**

  3. * 执行中任务列表

  4. */

  5. private ConcurrentHashMap<Integer, SQLJob> runningJobs = new ConcurrentHashMap<Integer, SQLJob>();

  6. /**

  7. * 待执行任务列表

  8. */

  9. private ConcurrentLinkedQueue<SQLJob> waitingJobs = new ConcurrentLinkedQueue<SQLJob>();

  10. public void addJob(SQLJob newJob, boolean parallExecute) {

  11.   if (parallExecute) {

  12.       runJob(newJob);

  13.   } else {

  14.       waitingJobs.offer(newJob);

  15.       if (runningJobs.isEmpty()) { // 若无正在执行中的任务,则从等待队列里获取任务进行执行。

  16.           SQLJob job = waitingJobs.poll();

  17.           if (job != null) {

  18.               runJob(job);

  19.           }

  20.       }

  21.   }

  22. }

  23. public boolean jobFinished(SQLJob sqlJob) {

  24.    runningJobs.remove(sqlJob.getId());

  25.    SQLJob job = waitingJobs.poll();

  26.    if (job != null) {

  27.        runJob(job);

  28.        return false;

  29.    } else {

  30.        if (noMoreJobInput) {

  31.            return runningJobs.isEmpty() && waitingJobs.isEmpty();

  32.        } else {

  33.            return false;

  34.        }

  35.    }

  36. }

  • 顺序执行时,当 runningJobs 存在执行中的任务时, #addJob(...) 时,不立即执行,添加到 waitingJobs。当 SQLJob 完成时,顺序调用下一个任务。

  • 并发执行时, #addJob(...) 时,立即执行。


SQLJob SQL 异步执行任务。其 jobHandler(SQLJobHandler) 属性,在 SQL 执行有返回结果时,会进行回调,从而实现异步执行。

ShareJoin 里, SQLJobHandler 有两个实现: ShareDBJoinHandlerShareRowOutPutDataHandler。前者,左边的表执行的 SQL 回调;后者,右边的表执行的 SQL 回调。

3.4 ShareDBJoinHandler

ShareDBJoinHandler左边的表执行的 SQL 回调。流程如下:

  • #fieldEofResponse(...) :接收数据节点返回的 fields,放入内存。

  • #rowResponse(...) :接收数据节点返回的 row,放入内存。

  • #rowEofResponse(...) :接收完一个数据节点返回所有的 row。当所有数据节点都完成 SQL 执行时,提交右边的表执行的 SQL 任务,并行执行,即图中#createQryJob(...)

当 SQL 为 /*!mycat:catlet=io.mycat.catlets.ShareJoin */SELECT o.id,u.usernamefromt_order o join t_user u on o.uid=u.id; 时, sql=getChildSQL() 的返回结果为 selectid,usernamefromt_userwhereidin(1,2,3)

核心代码如下:

  1. // ShareJoin.java

  2. private void createQryJob(int batchSize) {

  3.   int count = 0;

  4.   Map<String, byte[]> batchRows = new ConcurrentHashMap<String, byte[]>();

  5.   String theId = null;

  6.   StringBuilder sb = new StringBuilder().append('(');

  7.   String svalue = "";

  8.   for (Map.Entry<String, String> e : ids.entrySet()) {

  9.       theId = e.getKey();

  10.       byte[] rowbyte = rows.remove(theId);

  11.       if (rowbyte != null) {

  12.           batchRows.put(theId, rowbyte);

  13.       }

  14.       if (!svalue.equals(e.getValue())) {

  15.           if (joinKeyType == Fields.FIELD_TYPE_VAR_STRING

  16.                   || joinKeyType == Fields.FIELD_TYPE_STRING) { // joinkey 为varchar

  17.               sb.append("'").append(e.getValue()).append("'").append(','); // ('digdeep','yuanfang')

  18.           } else { // 默认joinkey为int/long

  19.               sb.append(e.getValue()).append(','); // (1,2,3)

  20.           }

  21.       }

  22.       svalue = e.getValue();

  23.       if (count++ > batchSize) {

  24.           break;

  25.       }

  26.   }

  27.   if (count == 0) {

  28.       return;

  29.   }

  30.   jointTableIsData = true;

  31.   sb.deleteCharAt(sb.length() - 1).append(')');

  32.   String sql = String.format(joinParser.getChildSQL(), sb);

  33.   getRoute(sql);

  34.   ctx.executeNativeSQLParallJob(getDataNodes(), sql, new ShareRowOutPutDataHandler(this, fields, joinindex, joinParser.getJoinRkey(), batchRows, ctx.getSession()));

  35. }

3.5 ShareRowOutPutDataHandler

ShareRowOutPutDataHandler右边的表执行的 SQL 回调。流程如下:

  • #fieldEofResponse(...) :接收数据节点返回的 fields,返回 header 给 MySQL Client。

  • #rowResponse(...) :接收数据节点返回的 row,匹配左表的记录,返回合并后返回的 row 给 MySQL Client。

  • #rowEofResponse(...) :当所有 row 都返回完后,返回 eof 给 MySQL Client。

核心代码如下:

  1. // ShareRowOutPutDataHandler.java

  2. public boolean onRowData(String dataNode, byte[] rowData) {

  3.   RowDataPacket rowDataPkgold = ResultSetUtil.parseRowData(rowData, bfields);

  4.   //拷贝一份batchRows

  5.   Map<String, byte[]> batchRowsCopy = new ConcurrentHashMap<String, byte[]>();

  6.   batchRowsCopy.putAll(arows);

  7.   // 获取Id字段,

  8.   String id = ByteUtil.getString(rowDataPkgold.fieldValues.get(joinR));

  9.   // 查找ID对应的A表的记录

  10.   byte[] arow = getRow(batchRowsCopy, id, joinL);

  11.   while (arow != null) {

  12.       RowDataPacket rowDataPkg = ResultSetUtil.parseRowData(arow, afields);//ctx.getAllFields());

  13.       for (int i = 1; i < rowDataPkgold.fieldCount; i++) {

  14.           // 设置b.name 字段

  15.           byte[] bname = rowDataPkgold.fieldValues.get(i);

  16.           rowDataPkg.add(bname);

  17.           rowDataPkg.addFieldCount(1);

  18.       }

  19.       // huangyiming add

  20.       MiddlerResultHandler middlerResultHandler = session.getMiddlerResultHandler();

  21.       if (null == middlerResultHandler) {

  22.           ctx.writeRow(rowDataPkg);

  23.       } else {

  24.           if (middlerResultHandler instanceof MiddlerQueryResultHandler) {

  25.               byte[] columnData = rowDataPkg.fieldValues.get(0);

  26.               if (columnData != null && columnData.length > 0) {

  27.                   String rowValue = new String(columnData);

  28.                   middlerResultHandler.add(rowValue);

  29.               }

  30.               //}

  31.           }

  32.       }

  33.       arow = getRow(batchRowsCopy, id, joinL);

  34.   }

  35.   return false;

  36. }



推荐: 【SFA官方翻译】:Spring中的组件扫描

上一篇: 【SFA官方翻译】Spring Boot中配置Tomcat连接池

关注公众号


    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存